搜索 K
Appearance
博客正在加载中...
Appearance
安装好 RabbitMQ 后,下一步就是使用了
我们将用 Java 编写两个程序:
并介绍 Java API 中的一些细节。
我们使用生产者,发消息给 MQ
然后消费者从队列中取出消息

我们新建一个 Maven 工程,并引入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency> 引入日志框架:
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
我们先声明一个队列的名字,将来后期用该队列存储信息:
package com.peterjxl.rabbitmq.demo;
public class Producer {
public static final String QUEUE_NAME = "hello";
} 然后我们写一个 main 方法来发送信息。有点类似 Mybatis,我们不直接创建连接,而是用工厂模式:
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123"); 然后我们创建一个连接:
Connection connection = factory.newConnection(); 之前我们说过,一个 Connection 是客户端和 MQ 的 TCP 连接,为了避免频繁创建,我们是使用信道的。这里我们使用 connection 对象创建信道:
Channel channel = connection.createChannel(); 下一步应该就是配置交换机和队列了,但为了方便学习,我们先不使用交换机,而是直接创建一个队列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);参数说明:
下一步就是发送消息了:
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成"); 参数说明:

我们点击 hello,可以看到该队列的详情:ready 的意思是有一个消息,已经准备好被消费了

完整代码:
package com.peterjxl.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
同样的,我们也是创建 connection 和 channel,然后获取消息,完整代码:
package com.peterjxl.rabbitmq.demo;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}关键方法:
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);参数说明:
为此,在调用该方法之前我们定义了 2 个回调函数:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息被中断");
};运行结果:
接收到消息:Hello World 同时在控制台也看到消息被清零了:

本项目已将源码上传到 Gitee 或 GitHub 上。并且创建了分支 demo1,读者可以通过切换分支来查看本文的示例代码